package dao.elasticsearch;

import dao.BaseMapper;
import model.elasticsearch.Tokenizer;
import model.elasticsearch.BaseElasticSearchModel;
import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.highlight.HighlightField;
import org.springframework.data.domain.Page;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.query.*;

import java.lang.reflect.ParameterizedType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.index.query.QueryBuilders.idsQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static util.ModelUtils.convert;
import static util.Utils.isCollectionEmpty;

/**
 * Created by xiaoyou on 16-7-4.
 */
public class BaseElasticSearchMapper<T extends BaseElasticSearchModel> implements BaseMapper<T> {

    private ElasticsearchTemplate elasticsearchTemplate;

    //Class info
    private Class<T> modelClass;

    private Document document;

    private String indexName;

    private String typeName;

    public void setElasticsearchTemplate(ElasticsearchTemplate elasticsearchTemplate) {
        this.elasticsearchTemplate = elasticsearchTemplate;
        afterPropertySet();
    }

    @Override
    public void insert(T entity) {
        IndexQuery indexQuery = new IndexQueryBuilder()
                .withIndexName(indexName)
                .withType(typeName)
                .withId(entity.getId())
                .withObject(entity).build();
        elasticsearchTemplate.index(indexQuery);
    }

    @Override
    public List<T> getAll() throws Exception {
        SearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build();
        Page<T> results = elasticsearchTemplate.queryForPage(searchQuery, modelClass);
        return results.getContent();
    }

    @Override
    public T get(final String id) {
        QueryBuilder queryBuilder = new BoolQueryBuilder();
        NativeSearchQuery nativeSearchQuery = new NativeSearchQuery(queryBuilder);
        nativeSearchQuery.setIds(new ArrayList<String>(){{
            add(id);
        }});
        List<T> results = elasticsearchTemplate.multiGet(nativeSearchQuery, modelClass);
        if(isCollectionEmpty(results)){
            throw new RuntimeException("no entity id [" + id + "].");
        }
        return results.get(0);
    }

    @Override
    public void update(String id, T entity) {
        IndexQuery indexQuery = new IndexQueryBuilder()
                .withIndexName(indexName)
                .withType(typeName)
                .withId(id)
                .withObject(entity).build();
        elasticsearchTemplate.index(indexQuery);
    }

    @Override
    public void delete(String id) {
//        DeleteResponse deleteResponse =
//                elasticsearchTemplate.getClient().prepareDelete(indexName, typeName, id).execute().actionGet();
        elasticsearchTemplate.delete(modelClass, id);
    }

    @Override
    public void bulkInsert(List<T> entities) {
        List<IndexQuery> indexQueries = new ArrayList<>();
        for (T entity: entities) {
            indexQueries.add(
                    new IndexQueryBuilder()
                            .withIndexName(indexName)
                            .withType(typeName)
                            .withId(entity.getId())
                            .withObject(entity).build()
            );
        }
        elasticsearchTemplate.bulkIndex(indexQueries);
    }

    @Override
    public void bulkDelete(List<String> ids) {
        BulkRequestBuilder bulkRequest = elasticsearchTemplate.getClient().prepareBulk();
        for (String id: ids) {
            bulkRequest.add(
                    elasticsearchTemplate.getClient()
                            .prepareDelete(indexName, typeName, id)
            );
        }
        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
        if(bulkResponse.hasFailures()){
            //TODO: delete fail
        }
    }

    public List<String> analyze(String source, Tokenizer tokenizer) {
        final AnalyzeResponse analyzeResponse = elasticsearchTemplate.getClient().admin().indices()
                .prepareAnalyze(indexName, source).setAnalyzer(tokenizer.getTitle()).get();
        return new ArrayList<String>(){{
            for (AnalyzeResponse.AnalyzeToken token : analyzeResponse.getTokens()) {
                add(token.getTerm());
            }
        }};
    }

    public List<T> search(SearchSourceBuilder builder) throws Exception {
        SearchResponse searchResponse = elasticsearchTemplate.getClient().prepareSearch(indexName).setTypes(typeName).setSource(builder.toString()).get();
        SearchHit[] searchHits = searchResponse.getHits().hits();
        return parseSearchHits(searchHits);
    }

    private List<T> parseSearchHits(final SearchHit[] searchHits) throws Exception {
        return new ArrayList<T>(){{
            for (SearchHit hit : searchHits) {
                final T entity = convert(modelClass, hit.getSource());
                entity.setScore(hit.getScore());
                final Map<String, HighlightField> highlightFields = hit.getHighlightFields();
                entity.setHighlightFields(new HashMap<String, String>(){{
                    for (Map.Entry<String, HighlightField> entry : highlightFields.entrySet()) {
                        put(entry.getKey(), entry.getValue().getFragments()[0].string());
                    }
                }});
                add(entity);
            }
        }};
    }

    private void afterPropertySet() {
        modelClass = (Class<T>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
        document = modelClass.getAnnotation(Document.class);
        indexName = document.indexName();
        typeName = document.type();
        if (!elasticsearchTemplate.indexExists(indexName)) {
            elasticsearchTemplate.createIndex(modelClass);
            elasticsearchTemplate.putMapping(modelClass);
            elasticsearchTemplate.refresh(modelClass);
        }
    }

}
